package com.deepuser.idmapping.api;

import com.alibaba.fastjson.JSON;
import com.deepuser.mq.kafka.utils.KafkaUtil;
import com.deepuser.response.BaseResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@RestController
@RequestMapping("/mq/kafka")
@Api(tags = "数据接入")
public class KafkaController {

    @Value("#{'${deepuser.topic}'.split(',')}")
    private List<String> topicList;  //不判断空

    ExecutorService threadPool = Executors.newFixedThreadPool(4);

    @PostMapping("/sendMsg")
    @ApiOperation(value = "发送消息到主题")
    public BaseResponse sendMsg(@RequestBody HashMap map1) {
        try{
            String topic = map1.get("topic").toString();
            if(!topicList.contains(topic)){
                log.error("入参TOPIC错误");
                return BaseResponse.fail("入参TOPIC错误");
            }
            map1.remove("topic");
            String msg  = JSON.toJSONString(map1);
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    KafkaUtil.sendTopicMessage(topic,msg);
                }
            });
            //假的返回值
            return BaseResponse.ok("success!");
        }catch (Exception e){
            log.error("系统异常-->{}",e.getMessage());
            return BaseResponse.fail(e.getMessage());
        }
    }
}
